1 module firecracker_d.core.transport;
2 import std.stdio;
3 import std.socket;
4 import requests;
5 import std.format;
6 import core.time;
7 import core.stdc.errno;
8 import core.stdc.string;
9 import std.conv;
10 
11 
12 class UnixStream : NetworkStream {
13     private {
14         Duration timeout;
15         Socket   s;
16         bool     __isOpen;
17         bool     __isConnected;
18 	string _facSocket;
19         string   _bind;
20     }
21     void open(AddressFamily fa) {
22 	    __isOpen = true;
23 	    s = new Socket(fa, SocketType.STREAM);
24     }
25     @property Socket so() @safe pure {
26         return s;
27     }
28     @property bool isOpen() @safe @nogc pure const {
29         return s && __isOpen;
30     }
31     @property bool isConnected() @safe @nogc pure const {
32         return s && __isOpen && __isConnected;
33     }
34     override void close() @trusted {
35         debug(requests) tracef("Close socket");
36         if ( isOpen ) {
37             s.close();
38             __isOpen = false;
39             __isConnected = false;
40         }
41         s = null;
42     }
43     /***
44     *  bind() just remember address. We will cal bind() at the time of connect as
45     *  we can have several connection trials.
46     ***/
47     override void bind(string to) {
48         _bind = to;
49     }
50     /***
51     *  Make connection to remote site. Bind, handle connection error, try several addresses, etc
52     ***/
53     NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) {
54         Address[] addresses;
55         __isConnected = false;
56         try {
57             addresses ~= new UnixAddress(host);
58 
59         } catch (Exception e) {
60             throw new ConnectError("Can't resolve name when connect to %s:%d: %s".format(host, port, e.msg));
61         }
62         foreach(a; addresses) {
63             try {
64                 open(AddressFamily.UNIX);
65                 if ( _bind !is null ) {
66                     auto ad = new UnixAddress(_bind);
67 		    writeln("bind to ", _bind);
68                     s.bind(ad);
69                 }
70                 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout);
71                 s.connect(a);
72                 __isConnected = true;
73                 break;
74             } catch (SocketException e) {
75                 s.close();
76                 throw new Exception("Could not connect to server.");
77             }
78         }
79         if ( !__isConnected ) {
80             throw new ConnectError("Can't connect to %s:%d".format(host, port));
81         }
82         return this;
83     }
84 
85     override ptrdiff_t send(const(void)[] buff)
86     in {assert(__isConnected);}
87     body {
88         auto rc = s.send(buff);
89         if (rc < 0) {
90             close();
91             throw new NetworkException("sending data: %s".format(to!string(strerror(errno))));
92         }
93         return rc;
94     }
95 
96     ptrdiff_t receive(void[] buff) {
97         while (true) {
98             auto r = s.receive(buff);
99             if (r < 0) {
100                 auto e = errno;
101                 version(Windows) {
102                     close();
103                     if ( e == 0 ) {
104                         throw new TimeoutException("Timeout receiving data");
105                     }
106                     throw new NetworkException("Unexpected error %s while receiving data".format(to!string(strerror(errno))));
107                 }
108                 version(Posix) {
109                     if ( e == EINTR ) {
110                         continue;
111                     }
112                     close();
113                     if ( e == EAGAIN ) {
114                         throw new TimeoutException("Timeout receiving data");
115                     }
116                     throw new NetworkException("Unexpected error %s while receiving data".format(to!string(strerror(errno))));
117                 }
118             }
119             else {
120                 buff.length = r;
121             }
122             return r;
123         }
124         assert(false);
125     }
126 
127     @property void readTimeout(Duration timeout) @safe {
128         if ( __isConnected )
129         {
130             s.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, timeout);
131         }
132     }
133     override NetworkStream accept() {
134         assert(false, "Implement before use");
135     }
136     @property override void reuseAddr(bool yes){
137         if (yes) {
138             s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1);
139         }
140         else {
141             s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 0);
142         }
143     }
144     override void bind(Address addr){
145         s.bind(addr);
146     }
147     override void listen(int n) {
148         s.listen(n);
149     };
150 
151     void setFactorySocket(string socket) {
152 	    _facSocket = socket.idup;
153     }
154 
155     NetworkStream dg(string scheme, string host, ushort port) {
156 	    UnixStream f = new UnixStream();
157 	    f.connect(_facSocket, 0);
158 	    return cast(NetworkStream)f;
159     }
160 
161 }